Skip to content

Commit 8388443

Browse files
committed
fix(message-stream): support multiple inbound message packets
If there was enough data buffered to read multiple messages, the MessageStream would currently callback to the underlaying Writable callback multiple times. Also fixed here is that the callback would not be called in the event that there wasn't enough data to process a single message NODE-2437
1 parent fa4b01b commit 8388443

File tree

2 files changed

+92
-31
lines changed

2 files changed

+92
-31
lines changed

lib/cmap/message_stream.js

+42-29
Original file line numberDiff line numberDiff line change
@@ -39,32 +39,7 @@ class MessageStream extends Duplex {
3939
const buffer = this[kBuffer];
4040
buffer.append(chunk);
4141

42-
while (buffer.length >= 4) {
43-
const sizeOfMessage = buffer.readInt32LE(0);
44-
if (sizeOfMessage < 0) {
45-
callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`));
46-
return;
47-
}
48-
49-
if (sizeOfMessage > this.maxBsonMessageSize) {
50-
callback(
51-
new MongoParseError(
52-
`Invalid message size: ${sizeOfMessage}, max allowed: ${this.maxBsonMessageSize}`
53-
)
54-
);
55-
return;
56-
}
57-
58-
if (sizeOfMessage > buffer.length) {
59-
callback();
60-
return;
61-
}
62-
63-
const messageBuffer = buffer.slice(0, sizeOfMessage);
64-
buffer.consume(sizeOfMessage);
65-
66-
processMessage(this, messageBuffer, callback);
67-
}
42+
processIncomingData(this, callback);
6843
}
6944

7045
_read(/* size */) {
@@ -125,7 +100,36 @@ function canCompress(command) {
125100
return !uncompressibleCommands.has(commandName);
126101
}
127102

128-
function processMessage(stream, message, callback) {
103+
function processIncomingData(stream, callback) {
104+
const buffer = stream[kBuffer];
105+
if (buffer.length < 4) {
106+
callback();
107+
return;
108+
}
109+
110+
const sizeOfMessage = buffer.readInt32LE(0);
111+
if (sizeOfMessage < 0) {
112+
callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`));
113+
return;
114+
}
115+
116+
if (sizeOfMessage > stream.maxBsonMessageSize) {
117+
callback(
118+
new MongoParseError(
119+
`Invalid message size: ${sizeOfMessage}, max allowed: ${stream.maxBsonMessageSize}`
120+
)
121+
);
122+
return;
123+
}
124+
125+
if (sizeOfMessage > buffer.length) {
126+
callback();
127+
return;
128+
}
129+
130+
const message = buffer.slice(0, sizeOfMessage);
131+
buffer.consume(sizeOfMessage);
132+
129133
const messageHeader = {
130134
length: message.readInt32LE(0),
131135
requestId: message.readInt32LE(4),
@@ -142,7 +146,12 @@ function processMessage(stream, message, callback) {
142146
new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions)
143147
);
144148

145-
callback();
149+
if (buffer.length >= 4) {
150+
processIncomingData(stream, callback);
151+
} else {
152+
callback();
153+
}
154+
146155
return;
147156
}
148157

@@ -176,7 +185,11 @@ function processMessage(stream, message, callback) {
176185
new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions)
177186
);
178187

179-
callback();
188+
if (buffer.length >= 4) {
189+
processIncomingData(stream, callback);
190+
} else {
191+
callback();
192+
}
180193
});
181194
}
182195

test/unit/cmap/message_stream.test.js

+50-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@ const expect = require('chai').expect;
88

99
function bufferToStream(buffer) {
1010
const stream = new Readable();
11-
stream.push(buffer);
11+
if (Array.isArray(buffer)) {
12+
buffer.forEach(b => stream.push(b));
13+
} else {
14+
stream.push(buffer);
15+
}
16+
1217
stream.push(null);
1318
return stream;
1419
}
@@ -24,6 +29,31 @@ describe('Message Stream', function() {
2429
),
2530
documents: [{ ismaster: 1 }]
2631
},
32+
{
33+
description: 'valid multiple OP_REPLY',
34+
expectedMessageCount: 4,
35+
data: Buffer.from(
36+
'370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' +
37+
'370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' +
38+
'370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' +
39+
'370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000',
40+
'hex'
41+
),
42+
documents: [{ ismaster: 1 }]
43+
},
44+
{
45+
description: 'valid OP_REPLY (partial)',
46+
data: [
47+
Buffer.from('37', 'hex'),
48+
Buffer.from('0000', 'hex'),
49+
Buffer.from(
50+
'000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000',
51+
'hex'
52+
)
53+
],
54+
documents: [{ ismaster: 1 }]
55+
},
56+
2757
{
2858
description: 'valid OP_MSG',
2959
data: Buffer.from(
@@ -32,6 +62,19 @@ describe('Message Stream', function() {
3262
),
3363
documents: [{ $db: 'admin', ismaster: 1 }]
3464
},
65+
{
66+
description: 'valid multiple OP_MSG',
67+
expectedMessageCount: 4,
68+
data: Buffer.from(
69+
'370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' +
70+
'370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' +
71+
'370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' +
72+
'370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000',
73+
'hex'
74+
),
75+
documents: [{ $db: 'admin', ismaster: 1 }]
76+
},
77+
3578
{
3679
description: 'Invalid message size (negative)',
3780
data: Buffer.from('ffffffff', 'hex'),
@@ -46,10 +89,13 @@ describe('Message Stream', function() {
4689
it(test.description, function(done) {
4790
const bson = new BSON();
4891
const error = test.error;
92+
const expectedMessageCount = test.expectedMessageCount || 1;
4993
const inputStream = bufferToStream(test.data);
5094
const messageStream = new MessageStream({ bson });
5195

96+
let messageCount = 0;
5297
messageStream.on('message', msg => {
98+
messageCount++;
5399
if (error) {
54100
done(new Error(`expected error: ${error}`));
55101
return;
@@ -63,7 +109,9 @@ describe('Message Stream', function() {
63109
.that.deep.equals(test.documents);
64110
}
65111

66-
done();
112+
if (messageCount === expectedMessageCount) {
113+
done();
114+
}
67115
});
68116

69117
messageStream.on('error', err => {

0 commit comments

Comments
 (0)