Skip to content

Commit 13a49c9

Browse files
fix and tests
1 parent 643a875 commit 13a49c9

File tree

2 files changed

+185
-0
lines changed

2 files changed

+185
-0
lines changed

Diff for: src/cmap/connection.ts

+4
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
237237
.on('error', this.onError.bind(this));
238238
this.socket.on('close', this.onClose.bind(this));
239239
this.socket.on('timeout', this.onTimeout.bind(this));
240+
241+
this.messageStream.pause();
240242
}
241243

242244
public get hello() {
@@ -651,6 +653,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
651653
private async *readMany(): AsyncGenerator<OpMsgResponse | OpReply> {
652654
try {
653655
this.dataEvents = onData(this.messageStream);
656+
this.messageStream.resume();
654657
for await (const message of this.dataEvents) {
655658
const response = await decompressResponse(message);
656659
yield response;
@@ -661,6 +664,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
661664
}
662665
} finally {
663666
this.dataEvents = null;
667+
this.messageStream.pause();
664668
this.throwIfAborted();
665669
}
666670
}

Diff for: test/unit/cmap/connection.test.ts

+181
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1+
import { Socket } from 'node:net';
2+
13
import { expect } from 'chai';
24
import * as sinon from 'sinon';
5+
import { setTimeout } from 'timers/promises';
36

47
import {
58
connect,
69
Connection,
710
isHello,
811
MongoClientAuthProviders,
12+
MongoDBCollectionNamespace,
913
MongoNetworkTimeoutError,
1014
ns
1115
} from '../../mongodb';
@@ -142,4 +146,181 @@ describe('new Connection()', function () {
142146
expect(beforeHandshakeSymbol).to.be.a('symbol');
143147
expect(error).to.have.property(beforeHandshakeSymbol, true);
144148
});
149+
150+
describe('NODE-6370: regression test', function () {
151+
class MockSocket extends Socket {
152+
override write(_data: string | Buffer) {
153+
return false;
154+
}
155+
}
156+
157+
let socket: MockSocket;
158+
let connection: Connection;
159+
160+
this.timeout(10_000);
161+
162+
beforeEach(function () {
163+
socket = new MockSocket();
164+
connection = new Connection(socket, {});
165+
});
166+
167+
const validResponse = Buffer.from(
168+
'a30000002a0800004b010000dd07000000000000008e000000016f6b00000000000000f03f0324636c757374657254696d65005800000011636c757374657254696d65001c00000093f6f266037369676e61747572650033000000056861736800140000000072d8d6eab4e0703d2d50846e2db7adb5d2733cc4126b65794964000200000026f6f2660000116f7065726174696f6e54696d65001c00000093f6f26600',
169+
'hex'
170+
);
171+
172+
const chunks = [validResponse.slice(0, 10), validResponse.slice(10)];
173+
174+
describe('when data is emitted before drain', function () {
175+
describe('first command', function () {
176+
describe('when there is no delay between data and drain', function () {
177+
it('does not hang', async function () {
178+
const result$ = connection.command(
179+
MongoDBCollectionNamespace.fromString('foo.bar'),
180+
{ ping: 1 },
181+
{}
182+
);
183+
// there is an await in writeCommand, we must move the event loop forward just enough
184+
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
185+
// listeners are attached.
186+
await setTimeout(0);
187+
188+
socket.emit('data', validResponse);
189+
socket.emit('drain');
190+
191+
await result$;
192+
});
193+
});
194+
195+
describe('when there is a delay between data and drain', function () {
196+
it('does not hang', async function () {
197+
const result$ = connection.command(
198+
MongoDBCollectionNamespace.fromString('foo.bar'),
199+
{ ping: 1 },
200+
{}
201+
);
202+
203+
// there is an await in writeCommand, we must move the event loop forward just enough
204+
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
205+
// listeners are attached.
206+
await setTimeout(0);
207+
socket.emit('data', validResponse);
208+
209+
await setTimeout(10);
210+
211+
socket.emit('drain');
212+
await result$;
213+
});
214+
});
215+
216+
describe('when the data comes in multiple chunks', function () {
217+
it('does not hang', async function () {
218+
const result$ = connection.command(
219+
MongoDBCollectionNamespace.fromString('foo.bar'),
220+
{ ping: 1 },
221+
{}
222+
);
223+
224+
// there is an await in writeCommand, we must move the event loop forward just enough
225+
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
226+
// listeners are attached.
227+
await setTimeout(0);
228+
socket.emit('data', chunks[0]);
229+
230+
await setTimeout(10);
231+
socket.emit('drain');
232+
233+
socket.emit('data', chunks[1]);
234+
235+
await result$;
236+
});
237+
});
238+
});
239+
240+
describe('not first command', function () {
241+
beforeEach(async function () {
242+
const result$ = connection.command(
243+
MongoDBCollectionNamespace.fromString('foo.bar'),
244+
{ ping: 1 },
245+
{}
246+
);
247+
248+
// there is an await in writeCommand, we must move the event loop forward just enough
249+
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
250+
// listeners are attached.
251+
await setTimeout(0);
252+
socket.emit('drain');
253+
socket.emit('data', validResponse);
254+
255+
await result$;
256+
});
257+
258+
describe('when there is no delay between data and drain', function () {
259+
it('does not hang', async function () {
260+
const result$ = connection.command(
261+
MongoDBCollectionNamespace.fromString('foo.bar'),
262+
{ ping: 1 },
263+
{}
264+
);
265+
266+
// there is an await in writeCommand, we must move the event loop forward just enough
267+
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
268+
// listeners are attached.
269+
await setTimeout(0);
270+
socket.emit('data', validResponse);
271+
272+
// await setTimeout(0);
273+
// await setTimeout(10);
274+
socket.emit('drain');
275+
await result$;
276+
});
277+
});
278+
279+
describe('when there is a delay between data and drain', function () {
280+
it('does not hang', async function () {
281+
const result$ = connection.command(
282+
MongoDBCollectionNamespace.fromString('foo.bar'),
283+
{ ping: 1 },
284+
{}
285+
);
286+
287+
// there is an await in writeCommand, we must move the event loop forward just enough
288+
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
289+
// listeners are attached.
290+
await setTimeout(0);
291+
socket.emit('data', validResponse);
292+
293+
await setTimeout(10);
294+
// await setTimeout(10);
295+
socket.emit('drain');
296+
await result$;
297+
});
298+
});
299+
300+
describe('when the data comes in multiple chunks', function () {
301+
it('does not hang', async function () {
302+
const result$ = connection.command(
303+
MongoDBCollectionNamespace.fromString('foo.bar'),
304+
{ ping: 1 },
305+
{}
306+
);
307+
308+
// there is an await in writeCommand, we must move the event loop forward just enough
309+
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
310+
// listeners are attached.
311+
await setTimeout(0);
312+
313+
socket.emit('data', chunks[0]);
314+
315+
await setTimeout(10);
316+
317+
socket.emit('drain');
318+
319+
socket.emit('data', chunks[1]);
320+
await result$;
321+
});
322+
});
323+
});
324+
});
325+
});
145326
});

0 commit comments

Comments
 (0)