Skip to content

Commit 3828aa8

Browse files
charmanderbrianc
authored andcommitted
Queued query errors (#1503)
* Add tests for query callbacks after connection-level errors * Ensure callbacks are executed for all queued queries after connection-level errors Separates socket errors from error messages, sends socket errors to all queries in the queue, marks clients as unusable after socket errors. This is not very pleasant but should maintain backwards compatibility…? * Always call `handleError` asynchronously This doesn’t match the original behaviour of the type errors, but it’s correct. * Fix return value of `Client.prototype.query` in immediate error cases * Mark clients with closed connections as unusable consistently * Add tests for error event when connecting Client * Ensure the promise and callback versions of Client#connect always have the same behaviour * Give same error to queued queries as to active query when ending and do so in the native Client as well. * Restore original ordering between queued query callbacks and 'end' event
1 parent fed6375 commit 3828aa8

File tree

6 files changed

+252
-92
lines changed

6 files changed

+252
-92
lines changed

lib/client.js

+93-32
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ var Client = function (config) {
3636
this._connecting = false
3737
this._connected = false
3838
this._connectionError = false
39+
this._queryable = true
3940

4041
this.connection = c.connection || new Connection({
4142
stream: c.stream,
@@ -52,16 +53,31 @@ var Client = function (config) {
5253

5354
util.inherits(Client, EventEmitter)
5455

55-
Client.prototype.connect = function (callback) {
56+
Client.prototype._errorAllQueries = function (err) {
57+
const enqueueError = (query) => {
58+
process.nextTick(() => {
59+
query.handleError(err, this.connection)
60+
})
61+
}
62+
63+
if (this.activeQuery) {
64+
enqueueError(this.activeQuery)
65+
this.activeQuery = null
66+
}
67+
68+
this.queryQueue.forEach(enqueueError)
69+
this.queryQueue.length = 0
70+
}
71+
72+
Client.prototype._connect = function (callback) {
5673
var self = this
5774
var con = this.connection
5875
if (this._connecting || this._connected) {
5976
const err = new Error('Client has already been connected. You cannot reuse a client.')
60-
if (callback) {
77+
process.nextTick(() => {
6178
callback(err)
62-
return undefined
63-
}
64-
return Promise.reject(err)
79+
})
80+
return
6581
}
6682
this._connecting = true
6783

@@ -126,15 +142,25 @@ Client.prototype.connect = function (callback) {
126142
}
127143

128144
const connectedErrorHandler = (err) => {
129-
if (this.activeQuery) {
130-
var activeQuery = self.activeQuery
131-
this.activeQuery = null
132-
return activeQuery.handleError(err, con)
133-
}
145+
this._queryable = false
146+
this._errorAllQueries(err)
134147
this.emit('error', err)
135148
}
136149

150+
const connectedErrorMessageHandler = (msg) => {
151+
const activeQuery = this.activeQuery
152+
153+
if (!activeQuery) {
154+
connectedErrorHandler(msg)
155+
return
156+
}
157+
158+
this.activeQuery = null
159+
activeQuery.handleError(msg, con)
160+
}
161+
137162
con.on('error', connectingErrorHandler)
163+
con.on('errorMessage', connectingErrorHandler)
138164

139165
// hook up query handling events to connection
140166
// after the connection initially becomes ready for queries
@@ -143,7 +169,9 @@ Client.prototype.connect = function (callback) {
143169
self._connected = true
144170
self._attachListeners(con)
145171
con.removeListener('error', connectingErrorHandler)
172+
con.removeListener('errorMessage', connectingErrorHandler)
146173
con.on('error', connectedErrorHandler)
174+
con.on('errorMessage', connectedErrorMessageHandler)
147175

148176
// process possible callback argument to Client#connect
149177
if (callback) {
@@ -166,43 +194,53 @@ Client.prototype.connect = function (callback) {
166194
})
167195

168196
con.once('end', () => {
169-
if (this.activeQuery) {
170-
var disconnectError = new Error('Connection terminated')
171-
this.activeQuery.handleError(disconnectError, con)
172-
this.activeQuery = null
173-
}
197+
const error = this._ending
198+
? new Error('Connection terminated')
199+
: new Error('Connection terminated unexpectedly')
200+
201+
this._errorAllQueries(error)
202+
174203
if (!this._ending) {
175204
// if the connection is ended without us calling .end()
176205
// on this client then we have an unexpected disconnection
177206
// treat this as an error unless we've already emitted an error
178207
// during connection.
179-
const error = new Error('Connection terminated unexpectedly')
180208
if (this._connecting && !this._connectionError) {
181209
if (callback) {
182210
callback(error)
183211
} else {
184-
this.emit('error', error)
212+
connectedErrorHandler(error)
185213
}
186214
} else if (!this._connectionError) {
187-
this.emit('error', error)
215+
connectedErrorHandler(error)
188216
}
189217
}
190-
this.emit('end')
218+
219+
process.nextTick(() => {
220+
this.emit('end')
221+
})
191222
})
192223

193224
con.on('notice', function (msg) {
194225
self.emit('notice', msg)
195226
})
227+
}
196228

197-
if (!callback) {
198-
return new global.Promise((resolve, reject) => {
199-
this.once('error', reject)
200-
this.once('connect', () => {
201-
this.removeListener('error', reject)
229+
Client.prototype.connect = function (callback) {
230+
if (callback) {
231+
this._connect(callback)
232+
return
233+
}
234+
235+
return new Promise((resolve, reject) => {
236+
this._connect((error) => {
237+
if (error) {
238+
reject(error)
239+
} else {
202240
resolve()
203-
})
241+
}
204242
})
205-
}
243+
})
206244
}
207245

208246
Client.prototype._attachListeners = function (con) {
@@ -340,7 +378,15 @@ Client.prototype._pulseQueryQueue = function () {
340378
if (this.activeQuery) {
341379
this.readyForQuery = false
342380
this.hasExecuted = true
343-
this.activeQuery.submit(this.connection)
381+
382+
const queryError = this.activeQuery.submit(this.connection)
383+
if (queryError) {
384+
process.nextTick(() => {
385+
this.activeQuery.handleError(queryError, this.connection)
386+
this.readyForQuery = true
387+
this._pulseQueryQueue()
388+
})
389+
}
344390
} else if (this.hasExecuted) {
345391
this.activeQuery = null
346392
this.emit('drain')
@@ -379,25 +425,40 @@ Client.prototype.query = function (config, values, callback) {
379425
query._result._getTypeParser = this._types.getTypeParser.bind(this._types)
380426
}
381427

428+
if (!this._queryable) {
429+
process.nextTick(() => {
430+
query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection)
431+
})
432+
return result
433+
}
434+
435+
if (this._ending) {
436+
process.nextTick(() => {
437+
query.handleError(new Error('Client was closed and is not queryable'), this.connection)
438+
})
439+
return result
440+
}
441+
382442
this.queryQueue.push(query)
383443
this._pulseQueryQueue()
384444
return result
385445
}
386446

387447
Client.prototype.end = function (cb) {
388448
this._ending = true
449+
389450
if (this.activeQuery) {
390451
// if we have an active query we need to force a disconnect
391452
// on the socket - otherwise a hung query could block end forever
392-
this.connection.stream.destroy(new Error('Connection terminated by user'))
393-
return cb ? cb() : Promise.resolve()
453+
this.connection.stream.destroy()
454+
} else {
455+
this.connection.end()
394456
}
457+
395458
if (cb) {
396-
this.connection.end()
397459
this.connection.once('end', cb)
398460
} else {
399-
return new global.Promise((resolve, reject) => {
400-
this.connection.end()
461+
return new Promise((resolve) => {
401462
this.connection.once('end', resolve)
402463
})
403464
}

lib/connection.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,11 @@ Connection.prototype.attachListeners = function (stream) {
117117
var packet = self._reader.read()
118118
while (packet) {
119119
var msg = self.parseMessage(packet)
120+
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
120121
if (self._emitMessage) {
121122
self.emit('message', msg)
122123
}
123-
self.emit(msg.name, msg)
124+
self.emit(eventName, msg)
124125
packet = self._reader.read()
125126
}
126127
})

0 commit comments

Comments
 (0)