Skip to content

Cursor: avoid closing connection twice if error received after destroy() #2836

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/pg-cursor/index.js
Original file line number Diff line number Diff line change
@@ -151,6 +151,9 @@ class Cursor extends EventEmitter {
}

handleError(msg) {
// If this cursor has already closed, don't try to handle the error.
if (this.state === 'done') return

// If we're in an initialized state we've never been submitted
// and don't have a connection instance reference yet.
// This can happen if you queue a stream and close the client before
81 changes: 81 additions & 0 deletions packages/pg-query-stream/test/error.ts
Original file line number Diff line number Diff line change
@@ -89,4 +89,85 @@ describe('error recovery', () => {
await client.end()
})
})

it('should work if used after timeout error', async () => {
const pool = new Pool({ max: 1, connectionTimeoutMillis: 400, statement_timeout: 400 })

const res1 = await pool.query('SELECT 1 AS a')
assert.deepStrictEqual(res1.rows, [{ a: 1 }])

const query = new QueryStream('SELECT 2 AS b')
const client = await pool.connect()
const stream = await client.query(query)

await assert.rejects(() => pool.query('SELECT TRUE'), { message: 'timeout exceeded when trying to connect' })

await stream.destroy()
await client.release()

const res2 = await pool.query('SELECT 4 AS d')
assert.deepStrictEqual(res2.rows, [{ d: 4 }])

await pool.end()
})

it('should work if used after syntax error', async () => {
const pool = new Pool({ max: 1, statement_timeout: 100 }) // statement_timeout is required here, so maybe this is just another timeout error?

const res1 = await pool.query('SELECT 1 AS a')
assert.deepStrictEqual(res1.rows, [{ a: 1 }])

const query = new QueryStream('SELECT 2 AS b')
const client = await pool.connect()
const stream = await client.query(query)

await new Promise((resolve) => setTimeout(resolve, 10))

await stream.destroy()
await client.release()

const res2 = await pool.query('SELECT 4 AS d')
assert.deepStrictEqual(res2.rows, [{ d: 4 }])

await pool.end()
})

it('should work after cancelling query', async () => {
const pool = new Pool()
const conn = await pool.connect()

// Get connection PID for sake of pg_cancel_backend() call
const result = await conn.query('SELECT pg_backend_pid() AS pid;')
const { pid } = result.rows[0]

const stream = conn.query(new QueryStream('SELECT pg_sleep(10);'))
stream.on('data', (chunk) => {
// Switches stream into readableFlowing === true mode
})
stream.on('error', (err) => {
// Errors are expected due to pg_cancel_backend() call
})

// Create a promise that is resolved when the stream is closed
const closed = new Promise((res) => {
stream.on('close', res)
})

// Wait 100ms before cancelling the query
await new Promise((res) => setTimeout(res, 100))

// Cancel pg_sleep(10) query
await pool.query('SELECT pg_cancel_backend($1);', [pid])

// Destroy stream and wait for it to be closed
stream.destroy()
await closed

// Subsequent query on same connection should succeed
const res = await conn.query('SELECT 1 AS a;')
assert.deepStrictEqual(res.rows, [{ a: 1 }])

conn.release()
await pool.end()
})
})