Skip to content

Queued query errors #1503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Oct 3, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 93 additions & 32 deletions lib/client.js
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@ var Client = function (config) {
this._connecting = false
this._connected = false
this._connectionError = false
this._queryable = true

this.connection = c.connection || new Connection({
stream: c.stream,
@@ -52,16 +53,31 @@ var Client = function (config) {

util.inherits(Client, EventEmitter)

Client.prototype.connect = function (callback) {
Client.prototype._errorAllQueries = function (err) {
const enqueueError = (query) => {
process.nextTick(() => {
query.handleError(err, this.connection)
})
}

if (this.activeQuery) {
enqueueError(this.activeQuery)
this.activeQuery = null
}

this.queryQueue.forEach(enqueueError)
this.queryQueue.length = 0
}

Client.prototype._connect = function (callback) {
var self = this
var con = this.connection
if (this._connecting || this._connected) {
const err = new Error('Client has already been connected. You cannot reuse a client.')
if (callback) {
process.nextTick(() => {
callback(err)
return undefined
}
return Promise.reject(err)
})
return
}
this._connecting = true

@@ -126,15 +142,25 @@ Client.prototype.connect = function (callback) {
}

const connectedErrorHandler = (err) => {
if (this.activeQuery) {
var activeQuery = self.activeQuery
this.activeQuery = null
return activeQuery.handleError(err, con)
}
this._queryable = false
this._errorAllQueries(err)
this.emit('error', err)
}

const connectedErrorMessageHandler = (msg) => {
const activeQuery = this.activeQuery

if (!activeQuery) {
connectedErrorHandler(msg)
return
}

this.activeQuery = null
activeQuery.handleError(msg, con)
}

con.on('error', connectingErrorHandler)
con.on('errorMessage', connectingErrorHandler)

// hook up query handling events to connection
// after the connection initially becomes ready for queries
@@ -143,7 +169,9 @@ Client.prototype.connect = function (callback) {
self._connected = true
self._attachListeners(con)
con.removeListener('error', connectingErrorHandler)
con.removeListener('errorMessage', connectingErrorHandler)
con.on('error', connectedErrorHandler)
con.on('errorMessage', connectedErrorMessageHandler)

// process possible callback argument to Client#connect
if (callback) {
@@ -166,43 +194,53 @@ Client.prototype.connect = function (callback) {
})

con.once('end', () => {
if (this.activeQuery) {
var disconnectError = new Error('Connection terminated')
this.activeQuery.handleError(disconnectError, con)
this.activeQuery = null
}
const error = this._ending
? new Error('Connection terminated')
: new Error('Connection terminated unexpectedly')

this._errorAllQueries(error)

if (!this._ending) {
// if the connection is ended without us calling .end()
// on this client then we have an unexpected disconnection
// treat this as an error unless we've already emitted an error
// during connection.
const error = new Error('Connection terminated unexpectedly')
if (this._connecting && !this._connectionError) {
if (callback) {
callback(error)
} else {
this.emit('error', error)
connectedErrorHandler(error)
}
} else if (!this._connectionError) {
this.emit('error', error)
connectedErrorHandler(error)
}
}
this.emit('end')

process.nextTick(() => {
this.emit('end')
})
})

con.on('notice', function (msg) {
self.emit('notice', msg)
})
}

if (!callback) {
return new global.Promise((resolve, reject) => {
this.once('error', reject)
this.once('connect', () => {
this.removeListener('error', reject)
Client.prototype.connect = function (callback) {
if (callback) {
this._connect(callback)
return
}

return new Promise((resolve, reject) => {
this._connect((error) => {
if (error) {
reject(error)
} else {
resolve()
})
}
})
}
})
}

Client.prototype._attachListeners = function (con) {
@@ -353,7 +391,15 @@ Client.prototype._pulseQueryQueue = function () {
if (this.activeQuery) {
this.readyForQuery = false
this.hasExecuted = true
this.activeQuery.submit(this.connection)

const queryError = this.activeQuery.submit(this.connection)
if (queryError) {
process.nextTick(() => {
this.activeQuery.handleError(queryError, this.connection)
this.readyForQuery = true
this._pulseQueryQueue()
})
}
} else if (this.hasExecuted) {
this.activeQuery = null
this.emit('drain')
@@ -389,25 +435,40 @@ Client.prototype.query = function (config, values, callback) {
query._result._getTypeParser = this._types.getTypeParser.bind(this._types)
}

if (!this._queryable) {
process.nextTick(() => {
query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection)
})
return result
}

if (this._ending) {
process.nextTick(() => {
query.handleError(new Error('Client was closed and is not queryable'), this.connection)
})
return result
}

this.queryQueue.push(query)
this._pulseQueryQueue()
return result
}

Client.prototype.end = function (cb) {
this._ending = true

if (this.activeQuery) {
// if we have an active query we need to force a disconnect
// on the socket - otherwise a hung query could block end forever
this.connection.stream.destroy(new Error('Connection terminated by user'))
return cb ? cb() : Promise.resolve()
this.connection.stream.destroy()
} else {
this.connection.end()
}

if (cb) {
this.connection.end()
this.connection.once('end', cb)
} else {
return new global.Promise((resolve, reject) => {
this.connection.end()
return new Promise((resolve) => {
this.connection.once('end', resolve)
})
}
3 changes: 2 additions & 1 deletion lib/connection.js
Original file line number Diff line number Diff line change
@@ -117,10 +117,11 @@ Connection.prototype.attachListeners = function (stream) {
var packet = self._reader.read()
while (packet) {
var msg = self.parseMessage(packet)
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
if (self._emitMessage) {
self.emit('message', msg)
}
self.emit(msg.name, msg)
self.emit(eventName, msg)
packet = self._reader.read()
}
})
128 changes: 80 additions & 48 deletions lib/native/client.js
Original file line number Diff line number Diff line change
@@ -32,8 +32,10 @@ var Client = module.exports = function (config) {
})

this._queryQueue = []
this._connected = false
this._ending = false
this._connecting = false
this._connected = false
this._queryable = true

// keep these on the object for legacy reasons
// for the time being. TODO: deprecate all this jazz
@@ -52,50 +54,48 @@ Client.Query = NativeQuery

util.inherits(Client, EventEmitter)

Client.prototype._errorAllQueries = function (err) {
const enqueueError = (query) => {
process.nextTick(() => {
query.native = this.native
query.handleError(err)
})
}

if (this._hasActiveQuery()) {
enqueueError(this._activeQuery)
this._activeQuery = null
}

this._queryQueue.forEach(enqueueError)
this._queryQueue.length = 0
}

// connect to the backend
// pass an optional callback to be called once connected
// or with an error if there was a connection error
// if no callback is passed and there is a connection error
// the client will emit an error event.
Client.prototype.connect = function (cb) {
Client.prototype._connect = function (cb) {
var self = this

var onError = function (err) {
if (cb) return cb(err)
return self.emit('error', err)
}

var result
if (!cb) {
var resolveOut, rejectOut
cb = (err) => err ? rejectOut(err) : resolveOut()
result = new global.Promise(function (resolve, reject) {
resolveOut = resolve
rejectOut = reject
})
}

if (this._connecting) {
process.nextTick(() => cb(new Error('Client has already been connected. You cannot reuse a client.')))
return result
return
}

this._connecting = true

this.connectionParameters.getLibpqConnectionString(function (err, conString) {
if (err) return onError(err)
if (err) return cb(err)
self.native.connect(conString, function (err) {
if (err) return onError(err)
if (err) return cb(err)

// set internal states to connected
self._connected = true

// handle connection errors from the native layer
self.native.on('error', function (err) {
// error will be handled by active query
if (self._activeQuery && self._activeQuery.state !== 'end') {
return
}
self._queryable = false
self._errorAllQueries(err)
self.emit('error', err)
})

@@ -110,12 +110,26 @@ Client.prototype.connect = function (cb) {
self.emit('connect')
self._pulseQueryQueue(true)

// possibly call the optional callback
if (cb) cb()
cb()
})
})
}

return result
Client.prototype.connect = function (callback) {
if (callback) {
this._connect(callback)
return
}

return new Promise((resolve, reject) => {
this._connect((error) => {
if (error) {
reject(error)
} else {
resolve()
}
})
})
}

// send a query to the server
@@ -129,26 +143,43 @@ Client.prototype.connect = function (cb) {
// optional string rowMode = 'array' for an array of results
// }
Client.prototype.query = function (config, values, callback) {
var query
var result

if (typeof config.submit === 'function') {
result = query = config
// accept query(new Query(...), (err, res) => { }) style
if (typeof values === 'function') {
config.callback = values
}
this._queryQueue.push(config)
this._pulseQueryQueue()
return config
} else {
query = new NativeQuery(config, values, callback)
if (!query.callback) {
let resolveOut, rejectOut
result = new Promise((resolve, reject) => {
resolveOut = resolve
rejectOut = reject
})
query.callback = (err, res) => err ? rejectOut(err) : resolveOut(res)
}
}

var query = new NativeQuery(config, values, callback)
var result
if (!query.callback) {
let resolveOut, rejectOut
result = new Promise((resolve, reject) => {
resolveOut = resolve
rejectOut = reject
if (!this._queryable) {
query.native = this.native
process.nextTick(() => {
query.handleError(new Error('Client has encountered a connection error and is not queryable'))
})
return result
}

if (this._ending) {
query.native = this.native
process.nextTick(() => {
query.handleError(new Error('Client was closed and is not queryable'))
})
query.callback = (err, res) => err ? rejectOut(err) : resolveOut(res)
return result
}

this._queryQueue.push(query)
this._pulseQueryQueue()
return result
@@ -157,6 +188,9 @@ Client.prototype.query = function (config, values, callback) {
// disconnect from the backend server
Client.prototype.end = function (cb) {
var self = this

this._ending = true

if (!this._connected) {
this.once('connect', this.end.bind(this, cb))
}
@@ -170,14 +204,12 @@ Client.prototype.end = function (cb) {
})
}
this.native.end(function () {
// send an error to the active query
if (self._hasActiveQuery()) {
var msg = 'Connection terminated'
self._queryQueue.length = 0
self._activeQuery.handleError(new Error(msg))
}
self.emit('end')
if (cb) cb()
self._errorAllQueries(new Error('Connection terminated'))

process.nextTick(() => {
self.emit('end')
if (cb) cb()
})
})
return result
}
11 changes: 3 additions & 8 deletions lib/query.js
Original file line number Diff line number Diff line change
@@ -147,22 +147,17 @@ Query.prototype.handleError = function (err, connection) {

Query.prototype.submit = function (connection) {
if (typeof this.text !== 'string' && typeof this.name !== 'string') {
const err = new Error('A query must have either text or a name. Supplying neither is unsupported.')
connection.emit('error', err)
connection.emit('readyForQuery')
return
return new Error('A query must have either text or a name. Supplying neither is unsupported.')
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning an error doesn't seem like a widely-used practice. Why are we not throwing it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would have to be caught to be passed to the callback and it wouldn’t be easy to distinguish from other errors at that point.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that you mention it, do we need to distinguish it from other errors? Reporting an error that occurred in the stack when calling Query.submit as a "query error" sounds rather intuitive.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There shouldn’t be any other errors so I prefer not to treat those like they’re expected if they do occur.

}
if (this.values && !Array.isArray(this.values)) {
const err = new Error('Query values must be an array')
connection.emit('error', err)
connection.emit('readyForQuery')
return
return new Error('Query values must be an array')
}
if (this.requiresPreparation()) {
this.prepare(connection)
} else {
connection.query(this.text)
}
return null
}

Query.prototype.hasBeenParsed = function (connection) {
18 changes: 18 additions & 0 deletions test/integration/client/error-handling-tests.js
Original file line number Diff line number Diff line change
@@ -50,6 +50,18 @@ suite.test('re-using connections results in promise rejection', (done) => {
})
})

suite.test('using a client after closing it results in error', (done) => {
const client = new Client()
client.connect(() => {
client.end(assert.calls(() => {
client.query('SELECT 1', assert.calls((err) => {
assert.equal(err.message, 'Client was closed and is not queryable')
done()
}))
}))
})
})

suite.test('query receives error on client shutdown', function (done) {
var client = new Client()
client.connect(assert.success(function () {
@@ -139,6 +151,9 @@ suite.test('when connecting to an invalid host with callback', function (done) {
var client = new Client({
user: 'very invalid username'
})
client.on('error', () => {
assert.fail('unexpected error event when connecting')
})
client.connect(function (error, client) {
assert(error instanceof Error)
done()
@@ -149,6 +164,9 @@ suite.test('when connecting to invalid host with promise', function (done) {
var client = new Client({
user: 'very invalid username'
})
client.on('error', () => {
assert.fail('unexpected error event when connecting')
})
client.connect().catch((e) => done())
})

59 changes: 56 additions & 3 deletions test/integration/connection-pool/error-tests.js
Original file line number Diff line number Diff line change
@@ -2,16 +2,15 @@
var helper = require('./test-helper')
const pg = helper.pg

// make pool hold 2 clients
const pool = new pg.Pool({ max: 2 })

const suite = new helper.Suite()
suite.test('connecting to invalid port', (cb) => {
const pool = new pg.Pool({ port: 13801 })
pool.connect().catch(e => cb())
})

suite.test('errors emitted on pool', (cb) => {
// make pool hold 2 clients
const pool = new pg.Pool({ max: 2 })
// get first client
pool.connect(assert.success(function (client, done) {
client.id = 1
@@ -46,3 +45,57 @@ suite.test('errors emitted on pool', (cb) => {
})
}))
})

suite.test('connection-level errors cause queued queries to fail', (cb) => {
const pool = new pg.Pool()
pool.connect(assert.success((client, done) => {
client.query('SELECT pg_terminate_backend(pg_backend_pid())', assert.calls((err) => {
if (helper.args.native) {
assert.ok(err)
} else {
assert.equal(err.code, '57P01')
}
}))

pool.once('error', assert.calls((err, brokenClient) => {
assert.equal(client, brokenClient)
}))

client.query('SELECT 1', assert.calls((err) => {
if (helper.args.native) {
assert.ok(err)
} else {
assert.equal(err.message, 'Connection terminated unexpectedly')
}

done()
pool.end()
cb()
}))
}))
})

suite.test('connection-level errors cause future queries to fail', (cb) => {
const pool = new pg.Pool()
pool.connect(assert.success((client, done) => {
client.query('SELECT pg_terminate_backend(pg_backend_pid())', assert.calls((err) => {
if (helper.args.native) {
assert.ok(err)
} else {
assert.equal(err.code, '57P01')
}
}))

pool.once('error', assert.calls((err, brokenClient) => {
assert.equal(client, brokenClient)

client.query('SELECT 1', assert.calls((err) => {
assert.equal(err.message, 'Client has encountered a connection error and is not queryable')

done()
pool.end()
cb()
}))
}))
}))
})