Skip to content

Extend test for issue 3174 - add cursor cases #3345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/pg-cursor/index.js
Original file line number Diff line number Diff line change
@@ -112,6 +112,12 @@ class Cursor extends EventEmitter {
handleDataRow(msg) {
const row = this._result.parseRow(msg.fields)
this.emit('row', row, this._result)

if (this._rows == null) {
const error = new Error('Received unexpected dataRow message from backend.')
this.handleError(error)
return
}
this._rows.push(row)
}

10 changes: 10 additions & 0 deletions packages/pg/lib/client.js
Original file line number Diff line number Diff line change
@@ -357,11 +357,21 @@ class Client extends EventEmitter {
}

_handleRowDescription(msg) {
if (this.activeQuery == null) {
const error = new Error('Received unexpected rowDescription message from backend.')
this._handleErrorEvent(error)
return
}
// delegate rowDescription to active query
this.activeQuery.handleRowDescription(msg)
}

_handleDataRow(msg) {
if (this.activeQuery == null) {
const error = new Error('Received unexpected dataRow message from backend.')
this._handleErrorEvent(error)
return
}
// delegate dataRow to active query
this.activeQuery.handleDataRow(msg)
}
89 changes: 86 additions & 3 deletions packages/pg/test/integration/gh-issues/3174-tests.js
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ const buffers = require('../../test-buffers')
const helper = require('../test-helper')
const assert = require('assert')
const cli = require('../../cli')
const Cursor = require('../../../../pg-cursor')

const suite = new helper.Suite()

@@ -103,9 +104,8 @@ const testErrorBuffer = (bufferName, errorBuffer) => {
if (!cli.native) {
assert(errorHit)
// further queries on the client should fail since its in an invalid state
await assert.rejects(() => client.query('SELECTR NOW()'), 'Further queries on the client should reject')
await assert.rejects(() => client.query('SELECT NOW()'), 'Further queries on the client should reject')
}

await closeServer()
})

@@ -128,7 +128,7 @@ const testErrorBuffer = (bufferName, errorBuffer) => {
if (!cli.native) {
assert(errorHit)
// further queries on the client should fail since its in an invalid state
await assert.rejects(() => client.query('SELECTR NOW()'), 'Further queries on the client should reject')
await assert.rejects(() => client.query('SELECT NOW()'), 'Further queries on the client should reject')
}

await client.end()
@@ -159,9 +159,92 @@ const testErrorBuffer = (bufferName, errorBuffer) => {
await pool.end()
await closeServer()
})

suite.testAsync(`Out of order ${bufferName} on simple query using cursors is catchable`, async () => {
const closeServer = await new Promise((resolve, reject) => {
return startMockServer(options.port, errorBuffer, (closeServer) => resolve(closeServer))
})
const client = new helper.Client(options)
await client.connect()

let errorHit = false
client.on('error', () => {
errorHit = true
})

const cursor = await client.query(new Cursor('SELECT NOW()'))
cursor.read(100, () => {})
await cursor.close()
await delay(50)

// the native client only emits a notice message and keeps on its merry way
if (!cli.native) {
assert(errorHit)
// further queries on the client should fail since its in an invalid state
await assert.rejects(() => client.query('SELECT NOW()'), 'Further queries on the client should reject')
}
await closeServer()
})

suite.testAsync(`Out of order ${bufferName} on extended query using cursors is catchable`, async () => {
const closeServer = await new Promise((resolve, reject) => {
return startMockServer(options.port, errorBuffer, (closeServer) => resolve(closeServer))
})
const client = new helper.Client(options)
await client.connect()

let errorHit = false
client.on('error', () => {
errorHit = true
})

const cursor = await client.query(new Cursor('SELECT $1', ['foo']))
cursor.read(100, () => {})
await cursor.close()
await delay(50)

// the native client only emits a notice message and keeps on its merry way
if (!cli.native) {
assert(errorHit)
// further queries on the client should fail since its in an invalid state
await assert.rejects(() => client.query('SELECT NOW()'), 'Further queries on the client should reject')
}

await client.end()

await closeServer()
})

suite.testAsync(`Out of order ${bufferName} on pool using cursors is catchable`, async () => {
const closeServer = await new Promise((resolve, reject) => {
return startMockServer(options.port, errorBuffer, (closeServer) => resolve(closeServer))
})
const pool = new helper.pg.Pool(options)

let errorHit = false
pool.on('error', () => {
errorHit = true
})

const cursor = await pool.query(new Cursor('SELECT $1', ['foo']))
cursor.read(100, () => {})
await cursor.close()
await delay(100)

if (!cli.native) {
assert(errorHit)
assert.strictEqual(pool.idleCount, 0, 'Pool should have no idle clients')
assert.strictEqual(pool.totalCount, 0, 'Pool should have no connected clients')
}

await pool.end()
await closeServer()
})
}

if (!helper.args.native) {
testErrorBuffer('parseComplete', buffers.parseComplete())
testErrorBuffer('commandComplete', buffers.commandComplete('f'))
testErrorBuffer('rowDescription', buffers.rowDescription())
testErrorBuffer('dataRow', buffers.dataRow())
}