Skip to content

Commit 8eca181

Browse files
authored
Fix pg-query-stream implementation (#2051)
* Fix pg-query-stream There were some subtle behaviors with the stream being implemented incorrectly & not working as expected with async iteration. I've modified the code based on #2050 and comments in #2035 to have better test coverage of async iterables and update the internals significantly to more closely match the readable stream interface. Note: this is a __breaking__ (semver major) change to this package as the close event behavior is changed slightly, and `highWaterMark` is no longer supported. It shouldn't impact most usage, but breaking regardless. * Remove a bunch of additional code * Add test for destroy + error propagation * Add failing test for destroying unsubmitted stream * Do not throw an uncatchable error when closing an unused cursor
1 parent 6d93951 commit 8eca181

File tree

7 files changed

+138
-61
lines changed

7 files changed

+138
-61
lines changed

packages/pg-cursor/index.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ Cursor.prototype.end = util.deprecate(function (cb) {
182182
}, 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.')
183183

184184
Cursor.prototype.close = function (cb) {
185-
if (this.state === 'done') {
185+
if (!this.connection || this.state === 'done') {
186186
if (cb) {
187187
return setImmediate(cb)
188188
} else {

packages/pg-cursor/test/close.js

+5
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,9 @@ describe('close', function () {
4646
})
4747
})
4848
})
49+
50+
it('is a no-op to "close" the cursor before submitting it', function (done) {
51+
const cursor = new Cursor(text)
52+
cursor.close(done)
53+
})
4954
})

packages/pg-query-stream/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ I'm very open to contribution! Open a pull request with your code or idea and w
4747

4848
The MIT License (MIT)
4949

50-
Copyright (c) 2013 Brian M. Carlson
50+
Copyright (c) 2013-2019 Brian M. Carlson
5151

5252
Permission is hereby granted, free of charge, to any person obtaining a copy
5353
of this software and associated documentation files (the "Software"), to deal

packages/pg-query-stream/index.js

+20-37
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
1-
'use strict'
2-
var Cursor = require('pg-cursor')
3-
var Readable = require('stream').Readable
1+
const { Readable } = require('stream')
2+
const Cursor = require('pg-cursor')
43

54
class PgQueryStream extends Readable {
6-
constructor (text, values, options) {
7-
super(Object.assign({ objectMode: true }, options))
8-
this.cursor = new Cursor(text, values, options)
9-
this._reading = false
10-
this._closed = false
11-
this.batchSize = (options || {}).batchSize || 100
5+
constructor(text, values, config = {}) {
6+
const { batchSize = 100 } = config;
7+
// https://nodejs.org/api/stream.html#stream_new_stream_readable_options
8+
super({ objectMode: true, emitClose: true, autoDestroy: true, highWaterMark: batchSize })
9+
this.cursor = new Cursor(text, values, config)
1210

1311
// delegate Submittable callbacks to cursor
1412
this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor)
@@ -19,40 +17,25 @@ class PgQueryStream extends Readable {
1917
this.handleError = this.cursor.handleError.bind(this.cursor)
2018
}
2119

22-
submit (connection) {
20+
submit(connection) {
2321
this.cursor.submit(connection)
2422
}
2523

26-
close (callback) {
27-
this._closed = true
28-
const cb = callback || (() => this.emit('close'))
29-
this.cursor.close(cb)
24+
_destroy(_err, cb) {
25+
this.cursor.close((err) => {
26+
cb(err || _err)
27+
})
3028
}
3129

32-
_read (size) {
33-
if (this._reading || this._closed) {
34-
return false
35-
}
36-
this._reading = true
37-
const readAmount = Math.max(size, this.batchSize)
38-
this.cursor.read(readAmount, (err, rows) => {
39-
if (this._closed) {
40-
return
41-
}
30+
// https://nodejs.org/api/stream.html#stream_readable_read_size_1
31+
_read(size) {
32+
this.cursor.read(size, (err, rows, result) => {
4233
if (err) {
43-
return this.emit('error', err)
44-
}
45-
// if we get a 0 length array we've read to the end of the cursor
46-
if (!rows.length) {
47-
this._closed = true
48-
setImmediate(() => this.emit('close'))
49-
return this.push(null)
50-
}
51-
52-
// push each row into the stream
53-
this._reading = false
54-
for (var i = 0; i < rows.length; i++) {
55-
this.push(rows[i])
34+
// https://nodejs.org/api/stream.html#stream_errors_while_reading
35+
this.destroy(err)
36+
} else {
37+
for (const row of rows) this.push(row)
38+
if (rows.length < size) this.push(null)
5639
}
5740
})
5841
}

packages/pg-query-stream/test/async-iterator.es6

+55
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,59 @@ describe('Async iterator', () => {
5454
assert.equal(allRows.length, 603)
5555
await pool.end()
5656
})
57+
58+
it('can break out of iteration early', async () => {
59+
const pool = new pg.Pool({ max: 1 })
60+
const client = await pool.connect()
61+
const rows = []
62+
for await (const row of client.query(new QueryStream(queryText, [], { batchSize: 1 }))) {
63+
rows.push(row)
64+
break;
65+
}
66+
for await (const row of client.query(new QueryStream(queryText, []))) {
67+
rows.push(row)
68+
break;
69+
}
70+
for await (const row of client.query(new QueryStream(queryText, []))) {
71+
rows.push(row)
72+
break;
73+
}
74+
assert.strictEqual(rows.length, 3)
75+
client.release()
76+
await pool.end()
77+
})
78+
79+
it('only returns rows on first iteration', async () => {
80+
const pool = new pg.Pool({ max: 1 })
81+
const client = await pool.connect()
82+
const rows = []
83+
const stream = client.query(new QueryStream(queryText, []))
84+
for await (const row of stream) {
85+
rows.push(row)
86+
break;
87+
}
88+
for await (const row of stream) {
89+
rows.push(row)
90+
}
91+
for await (const row of stream) {
92+
rows.push(row)
93+
}
94+
assert.strictEqual(rows.length, 1)
95+
client.release()
96+
await pool.end()
97+
})
98+
99+
it('can read with delays', async () => {
100+
const pool = new pg.Pool({ max: 1 })
101+
const client = await pool.connect()
102+
const rows = []
103+
const stream = client.query(new QueryStream(queryText, [], { batchSize: 1 }))
104+
for await (const row of stream) {
105+
rows.push(row)
106+
await new Promise((resolve) => setTimeout(resolve, 1))
107+
}
108+
assert.strictEqual(rows.length, 201)
109+
client.release()
110+
await pool.end()
111+
})
57112
})

packages/pg-query-stream/test/close.js

+55-19
Original file line numberDiff line numberDiff line change
@@ -4,49 +4,85 @@ var concat = require('concat-stream')
44
var QueryStream = require('../')
55
var helper = require('./helper')
66

7+
if (process.version.startsWith('v8.')) {
8+
return console.error('warning! node versions less than 10lts no longer supported & stream closing semantics may not behave properly');
9+
}
10+
711
helper('close', function (client) {
812
it('emits close', function (done) {
9-
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {batchSize: 2, highWaterMark: 2})
13+
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], { batchSize: 2, highWaterMark: 2 })
1014
var query = client.query(stream)
11-
query.pipe(concat(function () {}))
15+
query.pipe(concat(function () { }))
1216
query.on('close', done)
1317
})
1418
})
1519

1620
helper('early close', function (client) {
1721
it('can be closed early', function (done) {
18-
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], {batchSize: 2, highWaterMark: 2})
22+
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], { batchSize: 2, highWaterMark: 2 })
1923
var query = client.query(stream)
2024
var readCount = 0
2125
query.on('readable', function () {
2226
readCount++
2327
query.read()
2428
})
2529
query.once('readable', function () {
26-
query.close()
30+
query.destroy()
2731
})
2832
query.on('close', function () {
2933
assert(readCount < 10, 'should not have read more than 10 rows')
3034
done()
3135
})
3236
})
33-
})
3437

35-
helper('close callback', function (client) {
36-
it('notifies an optional callback when the conneciton is closed', function (done) {
37-
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], {batchSize: 2, highWaterMark: 2})
38-
var query = client.query(stream)
39-
query.once('readable', function () { // only reading once
40-
query.read()
41-
})
42-
query.once('readable', function () {
43-
query.close(function () {
44-
// nothing to assert. This test will time out if the callback does not work.
45-
done()
38+
it('can destroy stream while reading', function (done) {
39+
var stream = new QueryStream('SELECT * FROM generate_series(0, 100), pg_sleep(1)')
40+
client.query(stream)
41+
stream.on('data', () => done(new Error('stream should not have returned rows')))
42+
setTimeout(() => {
43+
stream.destroy()
44+
stream.on('close', done)
45+
}, 100)
46+
})
47+
48+
it('emits an error when calling destroy with an error', function (done) {
49+
var stream = new QueryStream('SELECT * FROM generate_series(0, 100), pg_sleep(1)')
50+
client.query(stream)
51+
stream.on('data', () => done(new Error('stream should not have returned rows')))
52+
setTimeout(() => {
53+
stream.destroy(new Error('intentional error'))
54+
stream.on('error', (err) => {
55+
// make sure there's an error
56+
assert(err);
57+
assert.strictEqual(err.message, 'intentional error');
58+
done();
4659
})
60+
}, 100)
61+
})
62+
63+
it('can destroy stream while reading an error', function (done) {
64+
var stream = new QueryStream('SELECT * from pg_sleep(1), basdfasdf;')
65+
client.query(stream)
66+
stream.on('data', () => done(new Error('stream should not have returned rows')))
67+
stream.once('error', () => {
68+
stream.destroy()
69+
// wait a bit to let any other errors shake through
70+
setTimeout(done, 100)
4771
})
48-
query.on('close', function () {
49-
assert(false, 'close event should not fire') // no close event because we did not read to the end of the stream.
50-
})
72+
})
73+
74+
it('does not crash when destroying the stream immediately after calling read', function (done) {
75+
var stream = new QueryStream('SELECT * from generate_series(0, 100), pg_sleep(1);')
76+
client.query(stream)
77+
stream.on('data', () => done(new Error('stream should not have returned rows')))
78+
stream.destroy()
79+
stream.on('close', done)
80+
})
81+
82+
it('does not crash when destroying the stream before its submitted', function (done) {
83+
var stream = new QueryStream('SELECT * from generate_series(0, 100), pg_sleep(1);')
84+
stream.on('data', () => done(new Error('stream should not have returned rows')))
85+
stream.destroy()
86+
stream.on('close', done)
5187
})
5288
})

packages/pg-query-stream/test/config.js

+1-3
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@ var assert = require('assert')
22
var QueryStream = require('../')
33

44
var stream = new QueryStream('SELECT NOW()', [], {
5-
highWaterMark: 999,
65
batchSize: 88
76
})
87

9-
assert.equal(stream._readableState.highWaterMark, 999)
10-
assert.equal(stream.batchSize, 88)
8+
assert.equal(stream._readableState.highWaterMark, 88)

0 commit comments

Comments
 (0)