From 4d7773183af9de4e3473a55b74a2646c8e1c2120 Mon Sep 17 00:00:00 2001 From: alxndrsn Date: Mon, 3 Oct 2022 11:17:37 +0300 Subject: [PATCH 1/5] stream-use-after-error: failing test case --- packages/pg-query-stream/test/close.ts | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/packages/pg-query-stream/test/close.ts b/packages/pg-query-stream/test/close.ts index 97e4627d9..db2f8c404 100644 --- a/packages/pg-query-stream/test/close.ts +++ b/packages/pg-query-stream/test/close.ts @@ -1,5 +1,6 @@ import assert from 'assert' import concat from 'concat-stream' +import pg from 'pg' import QueryStream from '../src' import helper from './helper' @@ -90,4 +91,25 @@ if (process.version.startsWith('v8.')) { stream.on('close', done) }) }) + + describe('use after error', () => { + it('should work if used after error', async () => { + const pool = new pg.Pool({ max: 1, connectionTimeoutMillis: 400, statement_timeout: 400 }); + + const res1 = await pool.query('SELECT TRUE'); + assert.deepStrictEqual(res1.rows, [ { bool:true } ]); + + const query = new QueryStream('SELECT TRUE'); + const client = await pool.connect(); + const stream = await client.query(query); + + await assert.rejects(() => pool.query('SELECT TRUE'), { message: 'timeout exceeded when trying to connect' }); + + await stream.destroy(); + await client.release(); + + const res2 = await pool.query('SELECT TRUE'); + assert.deepStrictEqual(res2.rows, [ { bool:true } ]); + }) + }) } From 3830a0f9b69a8b88c3a9d4b9e0cd8172dc396feb Mon Sep 17 00:00:00 2001 From: alxndrsn Date: Mon, 3 Oct 2022 11:31:02 +0300 Subject: [PATCH 2/5] Re-add .only() to reflect example usage in PR --- packages/pg-query-stream/test/close.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pg-query-stream/test/close.ts b/packages/pg-query-stream/test/close.ts index db2f8c404..c31a32445 100644 --- a/packages/pg-query-stream/test/close.ts +++ b/packages/pg-query-stream/test/close.ts @@ -93,7 +93,7 @@ if (process.version.startsWith('v8.')) { }) describe('use after error', () => { - it('should work if used after error', async () => { + it.only('should work if used after error', async () => { const pool = new pg.Pool({ max: 1, connectionTimeoutMillis: 400, statement_timeout: 400 }); const res1 = await pool.query('SELECT TRUE'); From c48f778dafaa3effdd9be9c0d9d7ca6cdfc38459 Mon Sep 17 00:00:00 2001 From: alxndrsn Date: Mon, 3 Oct 2022 12:03:25 +0300 Subject: [PATCH 3/5] Add optional hack to reduce likelihood of race condition --- packages/pg-query-stream/test/close.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/pg-query-stream/test/close.ts b/packages/pg-query-stream/test/close.ts index c31a32445..92755bee7 100644 --- a/packages/pg-query-stream/test/close.ts +++ b/packages/pg-query-stream/test/close.ts @@ -106,6 +106,8 @@ if (process.version.startsWith('v8.')) { await assert.rejects(() => pool.query('SELECT TRUE'), { message: 'timeout exceeded when trying to connect' }); await stream.destroy(); + if(process.env.WAIT_AFTER_CLOSE) await new Promise(resolve => setTimeout(resolve, 100)); + await client.release(); const res2 = await pool.query('SELECT TRUE'); From 477fa98b646be438c8ceadacf3ebced3aebad64a Mon Sep 17 00:00:00 2001 From: alxndrsn Date: Mon, 3 Oct 2022 12:58:59 +0300 Subject: [PATCH 4/5] Close the pool at the end of the test --- packages/pg-query-stream/test/close.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/pg-query-stream/test/close.ts b/packages/pg-query-stream/test/close.ts index 92755bee7..61677332a 100644 --- a/packages/pg-query-stream/test/close.ts +++ b/packages/pg-query-stream/test/close.ts @@ -112,6 +112,8 @@ if (process.version.startsWith('v8.')) { const res2 = await pool.query('SELECT TRUE'); assert.deepStrictEqual(res2.rows, [ { bool:true } ]); + + await pool.end(); }) }) } From efe64b087d43f6cd4ec72179a141ced0f075bfba Mon Sep 17 00:00:00 2001 From: alxndrsn Date: Mon, 3 Oct 2022 12:59:26 +0300 Subject: [PATCH 5/5] Add alternative "fix" --- packages/pg-query-stream/src/index.ts | 3 ++- packages/pg-query-stream/test/close.ts | 6 +++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/pg-query-stream/src/index.ts b/packages/pg-query-stream/src/index.ts index c942b0441..8eb275b41 100644 --- a/packages/pg-query-stream/src/index.ts +++ b/packages/pg-query-stream/src/index.ts @@ -45,7 +45,8 @@ class QueryStream extends Readable implements Submittable { } public _destroy(_err: Error, cb: Function) { - this.cursor.close((err?: Error) => { + this.cursor.close(async (err?: Error) => { + if(process.env.WAIT_AFTER_CLOSE === '2') await new Promise(resolve => setTimeout(resolve, 100)); cb(err || _err) }) } diff --git a/packages/pg-query-stream/test/close.ts b/packages/pg-query-stream/test/close.ts index 61677332a..8a81a7c71 100644 --- a/packages/pg-query-stream/test/close.ts +++ b/packages/pg-query-stream/test/close.ts @@ -106,7 +106,11 @@ if (process.version.startsWith('v8.')) { await assert.rejects(() => pool.query('SELECT TRUE'), { message: 'timeout exceeded when trying to connect' }); await stream.destroy(); - if(process.env.WAIT_AFTER_CLOSE) await new Promise(resolve => setTimeout(resolve, 100)); + if(process.env.WAIT_AFTER_CLOSE === '1') await new Promise(resolve => setTimeout(resolve, 100)); + + if(process.env.WAIT_AFTER_CLOSE === '2') { + await new Promise(resolve => stream.once('close', resolve)); + } await client.release();