Skip to content

Commit 8d44a39

Browse files
committed
Merge pull request #536 from brianc/packet-reader
This moves the packet reading and chunking into a separate module
2 parents 2716f95 + 79f0394 commit 8d44a39

File tree

3 files changed

+26
-64
lines changed

3 files changed

+26
-64
lines changed

Diff for: lib/connection.js

+21-62
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ var util = require('util');
55

66
var utils = require(__dirname + '/utils');
77
var Writer = require('buffer-writer');
8+
var Reader = require('packet-reader');
89

910
var TEXT_MODE = 0;
1011
var BINARY_MODE = 1;
@@ -23,6 +24,10 @@ var Connection = function(config) {
2324
this._ending = false;
2425
this._mode = TEXT_MODE;
2526
this._emitMessage = false;
27+
this._reader = new Reader({
28+
headerSize: 1,
29+
lengthPadding: -4
30+
});
2631
var self = this;
2732
this.on('newListener', function(eventName) {
2833
if(eventName == 'message') {
@@ -87,17 +92,19 @@ Connection.prototype.connect = function(port, host) {
8792
};
8893

8994
Connection.prototype.attachListeners = function(stream) {
95+
var self = this;
9096
stream.on('data', function(buff) {
91-
this.setBuffer(buff);
92-
var msg = this.parseMessage();
93-
while(msg) {
94-
if(this._emitMessage) {
95-
this.emit('message', msg);
97+
self._reader.addChunk(buff);
98+
var packet = self._reader.read();
99+
while(packet) {
100+
var msg = self.parseMessage(packet);
101+
if(self._emitMessage) {
102+
self.emit('message', msg);
96103
}
97-
this.emit(msg.name, msg);
98-
msg = this.parseMessage();
104+
self.emit(msg.name, msg);
105+
packet = self._reader.read();
99106
}
100-
}.bind(this));
107+
});
101108
};
102109

103110
Connection.prototype.requestSsl = function(config) {
@@ -306,63 +313,16 @@ Connection.prototype.sendCopyFail = function (msg) {
306313
this._send(0x66);
307314
};
308315

309-
//parsing methods
310-
Connection.prototype.setBuffer = function(buffer) {
311-
if(this.lastBuffer) { //we have unfinished biznaz
312-
//need to combine last two buffers
313-
var remaining = this.lastBuffer.length - this.lastOffset;
314-
var combinedBuffer = new Buffer(buffer.length + remaining);
315-
this.lastBuffer.copy(combinedBuffer, 0, this.lastOffset);
316-
buffer.copy(combinedBuffer, remaining, 0);
317-
buffer = combinedBuffer;
318-
}
319-
this.lastBuffer = false;
320-
this.buffer = buffer;
321-
this.offset = 0;
322-
};
323-
324-
Connection.prototype.readSslResponse = function() {
325-
var remaining = this.buffer.length - (this.offset);
326-
if(remaining < 1) {
327-
this.lastBuffer = this.buffer;
328-
this.lastOffset = this.offset;
329-
return false;
330-
}
331-
return {
332-
name: 'sslresponse',
333-
text: this.buffer[this.offset++]
334-
};
335-
};
336-
337316
var Message = function(name, length) {
338317
this.name = name;
339318
this.length = length;
340319
};
341320

342-
Connection.prototype.parseMessage = function() {
343-
var remaining = this.buffer.length - (this.offset);
344-
if(remaining < 5) {
345-
//cannot read id + length without at least 5 bytes
346-
//just abort the read now
347-
this.lastBuffer = this.buffer;
348-
this.lastOffset = this.offset;
349-
return false;
350-
}
351-
352-
//read message id code
353-
var id = this.buffer[this.offset++];
354-
var buffer = this.buffer;
355-
//read message length
356-
var length = this.parseInt32(buffer);
357-
358-
if(remaining <= length) {
359-
this.lastBuffer = this.buffer;
360-
//rewind the last 5 bytes we read
361-
this.lastOffset = this.offset-5;
362-
return false;
363-
}
321+
Connection.prototype.parseMessage = function(buffer) {
364322

365-
switch(id)
323+
this.offset = 0;
324+
var length = buffer.length + 4;
325+
switch(this._reader.header)
366326
{
367327

368328
case 0x52: //R
@@ -422,7 +382,6 @@ Connection.prototype.parseMessage = function() {
422382
case 0x64: //d
423383
return this.parsed(buffer, length);
424384
}
425-
return false;
426385
};
427386

428387
Connection.prototype.parseR = function(buffer, length) {
@@ -440,7 +399,7 @@ Connection.prototype.parseR = function(buffer, length) {
440399
if(code === 5) { //md5 required
441400
msg.name = 'authenticationMD5Password';
442401
msg.salt = new Buffer(4);
443-
this.buffer.copy(msg.salt, 0, this.offset, this.offset + 4);
402+
buffer.copy(msg.salt, 0, this.offset, this.offset + 4);
444403
this.offset += 4;
445404
return msg;
446405
}
@@ -610,7 +569,7 @@ Connection.prototype.parseH = function(buffer, length) {
610569
};
611570

612571
Connection.prototype.parseGH = function (buffer, msg) {
613-
var isBinary = this.buffer[this.offset] !== 0;
572+
var isBinary = buffer[this.offset] !== 0;
614573
this.offset++;
615574
msg.binary = isBinary;
616575
var columnCount = this.parseInt16(buffer);

Diff for: package.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
"generic-pool": "2.0.3",
2222
"buffer-writer": "1.0.0",
2323
"pgpass": "0.0.1",
24-
"nan": "~0.6.0"
24+
"nan": "~0.6.0",
25+
"packet-reader": "0.2.0"
2526
},
2627
"devDependencies": {
2728
"jshint": "1.1.0",

Diff for: test/unit/client/typed-query-results-tests.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,9 @@ test('typed results', function() {
121121
dataTypeID: 1082,
122122
actual: '2010-10-31',
123123
expected: function(val) {
124-
assert.UTCDate(val, 2010, 9, 31, 0, 0, 0, 0);
124+
var now = new Date(2010, 9, 31)
125+
assert.UTCDate(val, 2010, now.getUTCMonth(), now.getUTCDate(), now.getUTCHours(), 0, 0, 0);
126+
assert.equal(val.getHours(), now.getHours())
125127
}
126128
},{
127129
name: 'interval time',

0 commit comments

Comments
 (0)